import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;




public class JoinwithJoinFunction {


    public static void main(String[] args) throws Exception{



    ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
    Rating rating1=new Rating("yuchi","run",50);

    DataSet<Rating> ratings = env.fromElements(rating1);


    String rootPath = new StringBuilder(System.getProperty("user.dir")).append("/dataset_api/Java").toString();
    DataSet<Tuple2<String, Double>> weights = env.readCsvFile("file:///" + rootPath + "/" + "flatjoin.csv").types(String.class, Double.class);


    DataSet<Tuple2<String, Double>>weightedRatings =
            ratings.join(weights)

                    // key of the first input
                    .where("category")

                    // key of the second input
                    .equalTo("f0")

                    // applying the JoinFunction on joining pairs
                    .with(new PointWeighter());



    weightedRatings.print();


    }

}




